package student.convert

import org.apache.flink.api.common.functions.FilterFunction
import org.apache.flink.streaming.api.scala._
import student.convert.JsonParser.parseJsonToStockData
import util.RandDomData

object TestRandomData {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env
      .addSource(new RandDomData)
      .map(data=>(data.index,1))
      .keyBy(_._1)
      .reduce((r1,r2)=>(r1._1,r1._2+r2._2))
      .filter(_._2<=1)
      .print()
    env.execute()
  }
}
